
#############################################################################
## $Id: WorkQueue.pm 15615 2013-04-01 21:53:31Z spadkins $
#############################################################################

package App::Context;

sub work_queue {
    my ($self, $service_name, @args) = @_;
    return($self->service("WorkQueue", $service_name, @args));
}

package App::WorkQueue;

use App;
use App::Service;
@ISA = ( "App::Service" );

use Data::Dumper;
$Data::Dumper::Terse = 1;

use strict;

my $verbose = $App::options{verbose};
$verbose = 0 if (!$verbose || $verbose < 5);

=head1 NAME

App::WorkQueue - Interface for a work queue

=head1 SYNOPSIS

    use App;

    $context = App->context();
    $q = $context->service("WorkQueue"); 

    ...

=head1 DESCRIPTION

A WorkQueue is for channeling work from a variety of requesting sources to a 
limited set of workers who work the queue.

It implements a new service in the App-Context framework called a
WorkQueue.  A normal queue supports push() and pop() operations
where the entries push()ed onto the queue come out in a first-in-
first-out (FIFO) basis.  A WorkQueue however is used to control the
processing of units of work, where the order that entries are
processed is prioritized subject to various constraints.

Therefore, a WorkQueue supports the following operations.

  push()      - add a unit of work to the queue
  acquire()   - get the highest priority entry subject to constraints
  release()   - remove an (acquired) entry from the queue
  unacquire() - return an (acquired) entry to the unacquired state

Internally, the WorkQueue can hold HASHREF's (the default) or ARRAYREF's. 
However, if ARRAYREF's are to be stored, then a {columns} array and a
{colidx} hash must also be stored on the object.

=head1 INTERNAL ELEMENTS

 $self->{data} = [];
 $self->{columns} = [];
 $self->{colidx} = {};

=cut

#############################################################################
# CONSTANTS
#############################################################################
my $GCONSTR_COUNTS       = 0;
my $GCONSTR_LIMITS       = 1;
my $GCONSTR_COUNT_ATTRIB = 2;
my $GCONSTR_LIMIT_ATTRIB = 3;
my $GCONSTR_VALUE_ATTRIB = 4;
my $GCONSTR_HARD_LIMIT   = 5;

my $CONSTR_COUNTS        = 0;
my $CONSTR_LIMITS        = 1;
my $CONSTR_KEY_ATTRIB    = 2;
my $CONSTR_COUNT_ATTRIB  = 3;
my $CONSTR_KEY_IDX       = 4;
my $CONSTR_COUNT_IDX     = 5;

#############################################################################
# CONSTRUCTOR METHODS
#############################################################################

=head1 Constructor Methods:

#############################################################################
# Method: _init()
#############################################################################

=head2 _init()

The _init() method is called from within the standard Service
constructor.
It allows a WorkQueue to customize the behavior of the
constructor. 

    * Signature: _init($named)
    * Param:     $named      {}   [in]
    * Return:    void

    Sample Usage: 

    $service->_init(\%args);

=cut

sub _init {
    &App::sub_entry if ($App::trace);
    my ($self, $args) = @_;

    $self->SUPER::_init($args);
    $self->{data} = [];
    if (!$self->{type}) {
        $self->{type} = $self->{queue_type} || ($self->{columns} ? "ARRAY" : "HASH");
    }

    die "status_attrib not set on queue"     if (!$self->{status_attrib});
    #die "STATUS_UNBUFFERED not set on queue" if (!$self->{STATUS_UNBUFFERED});
    die "STATUS_UNACQUIRED not set on queue" if (!$self->{STATUS_UNACQUIRED});
    die "STATUS_ACQUIRED not set on queue"   if (!$self->{STATUS_ACQUIRED});
    die "STATUS_RELEASED not set on queue"   if (!$self->{STATUS_RELEASED});
    die "id_attrib or auto_id_attrib not set on queue" if (!$self->{id_attrib} && !$self->{auto_id_attrib});

    my $id_attrib = $self->{id_attrib};
    if ($self->{type} eq "ARRAY") {
        my $colidx = $self->_colidx();
        my $id_attribs = [split(/,/,$id_attrib)];
        my (@id_idx);
        foreach my $attrib (@$id_attribs) {
            push(@id_idx, $colidx->{$attrib});
        }
        $self->{id_attribs} = $id_attribs;
        $self->{id_indexes} = \@id_idx;
    }
    else {
        $self->{id_attribs} = [split(/,/,$id_attrib)];
    }

    if ($self->{auto_id_attrib}) {
        if ($self->{type} eq "ARRAY") {
            my $colidx  = $self->_colidx();
            my $attrib  = $self->{auto_id_attrib};
            my $columns = $self->{columns};
            if (defined $colidx->{$attrib}) {
                $self->{auto_id_idx} = $colidx->{$attrib};
            }
            else {
                push(@$columns, $attrib);
                $colidx->{$attrib} = $#$columns;
                $self->{auto_id_idx} = $#$columns;
            }
        }
    }

    $self->{status_counts} = {};

    if ($self->{sort_spec}) {
        $self->_analyze_sort_spec();
    }

    $self->{verbose} = $verbose;
    &App::sub_exit() if ($App::trace);
}

sub shutdown {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    &App::sub_exit() if ($App::trace);
}

=cut

#############################################################################
# new()
#############################################################################

=head2 new()

The constructor is inherited from
L<C<App::Service>|App::Service/"new()">.

=cut

#############################################################################
# PUBLIC METHODS
#############################################################################

=head1 Public Methods:

Unlike a regular queue where the primitive operations are push()/pop(), the
primitive operations for this work queue are:

 * $q->push($entry)
 * $entry = $q->acquire()
 * $q->release($entry)
 * $q->unacquire($entry)
 * @entries = $q->locate(\%params, \%options)

=cut

#############################################################################
# push()
#############################################################################

=head2 push()

    * Signature: $q->push($entry);
    * Param:     $entry             HASH/ARRAY
    * Return:    undef

    Sample Usage: 

    $context = App->context();
    $q = $context->service("WorkQueue");  # or ...
    $q = $context->work_queue();

    $q->push({ name => "Joe", degrees => 3 });
    $q->push({ name => "Mike", degrees => 1 });

=cut

sub push {
    &App::sub_entry if ($App::trace);
    my ($self, $entry) = @_;
    # We explicitly _update_ref() rather than update() because the entry is not
    # in the remote store yet.
    $self->_update_ref($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
    die "push() not implemented";
    &App::sub_exit() if ($App::trace);
}

sub _push_in_mem {
    &App::sub_entry if ($App::trace);
    my ($self, $entry, $release_lowest) = @_;
    my $ref = ref($entry);
    my $type = "";
    my $status_attrib = $self->{status_attrib};
    my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
    my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
    my $STATUS_UNBUFFERED = $self->{STATUS_UNBUFFERED};
    my $colidx;
    my $status_idx;
    if ($ref eq "ARRAY") {
        $type = "ARRAY";
        print "PUSHED[M]: [", join("|",@$entry), "]\n" if ($verbose);
        $colidx = $self->_colidx();
        $status_idx = $colidx->{$status_attrib};
    }
    elsif ($ref) {
        $type = "HASH";
        print "PUSHED[M]: {", join("|",%$entry), "}\n" if ($verbose);
    }
    die "tried to push entry of type [$type] onto queue of type [$self->{type}]" if ($type ne $self->{type});
    my $num_added = 0;
    my $entries = $self->{data};
    my ($removed_entry);
    if (!$self->{sort_spec} || $#$entries == -1) {
        if ($release_lowest) {
            # do nothing. this entry is the "lowest", so we shouldn't even push it on.
        }
        else {
            CORE::push(@$entries, $entry);
            $self->update($entry,[$status_attrib],[$STATUS_UNACQUIRED]);
            $num_added = 1;
        }
    }
    else {
        my $inserted = 0;
        if ($release_lowest) {
            my $removed = 0;
            my ($ent, $cmp);
            my $resource_key = $self->_resource_key($entry);
            for (my $i = $#$entries; $i >= 0; $i--) {
                $ent = $entries->[$i];
                if ($self->_resource_key($ent) eq $resource_key) {
                    # IF not inserted yet AND new entry is lower or same priority THEN
                    $cmp = $self->_compare_entries($entry, $ent);  # a "-1" means higher priority
                    if (!$inserted && $cmp > -1) {   # new $entry is *not* higher priority
                        if (!$removed) {   # "insert and remove self"
                            $inserted = 1;
                            $removed  = 1;
                        }
                        else {
                            $self->update($entry,[$status_attrib],[$STATUS_UNACQUIRED]);
                            splice(@$entries, $i+1, 0, $entry);
                            $num_added++;
                            $inserted = 1;
                        }
                    }
                    my $acquired;
                    if ($ref eq "ARRAY") {
                        $acquired = $ent->[$status_idx] eq $STATUS_ACQUIRED;
                    }
                    elsif ($ref) {
                        $acquired = $ent->{$status_attrib} eq $STATUS_ACQUIRED;
                    }
                    if (!$removed && !$acquired) {
                        $removed_entry = $entries->[$i];
                        $self->update($removed_entry,[$status_attrib],[$STATUS_UNBUFFERED]);
                        splice(@$entries, $i, 1);
                        $num_added--;
                        $removed = 1;
                    }
                }
                else { # this block, we are certain we are not comparing to ourselves
                    if ($removed && !$inserted && $self->_compare_entries($entry, $ent) > -1) {
                        $self->update($entry,[$status_attrib],[$STATUS_UNACQUIRED]);
                        splice(@$entries, $i+1, 0, $entry);
                        $num_added++;
                        $inserted = 1;
                    }
                }
                last if ($inserted && $removed);
            }
        }
        else {
            my ($cmp);
            for (my $i = $#$entries; $i >= 0; $i--) {
                $cmp = $self->_compare_entries($entry, $entries->[$i]);
                if ($cmp == 0) {  # it's identical. don't add it again.
                    $inserted = 1;
                    last;
                }
                elsif ($cmp == 1) { # it's lower priority (and different)
                    $self->update($entry,[$status_attrib],[$STATUS_UNACQUIRED]);
                    splice(@$entries, $i+1, 0, $entry);
                    $num_added++;
                    $inserted = 1;
                    last;
                }
            }
        }
        if (!$inserted) {
            $self->update($entry,[$status_attrib],[$STATUS_UNACQUIRED]);
            unshift(@$entries, $entry);
            $num_added++;
        }
    }

    &App::sub_exit($num_added) if ($App::trace);
    return($num_added);
}

sub _analyze_sort_spec {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    my $sort_spec = $self->{sort_spec};
    if ($sort_spec) {
        my $sort_columns = [];
        my $sort_idx     = [];
        my $direction    = [];
        my $numeric      = [];
        my $colidx       = $self->_colidx();
        my @sort_spec    = split(/,/, $sort_spec);
        foreach my $col (@sort_spec) {
            if ($col =~ /^([NC]?)([+-]?)(.*)$/) {
                CORE::push(@$direction,    (($2 eq "-") ? -1 : 1));
                CORE::push(@$numeric,      (($1 eq "N") ? 1 : 0));
                CORE::push(@$sort_columns, $3);
                if ($self->{type} eq "ARRAY") {
                    CORE::push(@$sort_idx, $colidx->{$3});
                }
            }
            else {
                CORE::push(@$sort_columns, $col);
                CORE::push(@$direction,    1);
                CORE::push(@$numeric,      0);
            }
        }
        $self->{sort_columns} = $sort_columns;
        $self->{sort_idx}     = $sort_idx;
        $self->{direction}    = $direction;
        $self->{numeric}      = $numeric;
    }
    else {
        $self->{sort_columns} = [];
        $self->{sort_idx}     = [];
        $self->{direction}    = [];
        $self->{numeric}      = [];
    }
    &App::sub_exit() if ($App::trace);
}

# for a sorting function, a $sign=-1 means the first entry ($a) is "smaller" so sorts earlier (higher priority)
# for a sorting function, a $sign=1  means the first entry ($a) is "larger"  so sorts later   (lower priority)
# so (1 <=> 2) = -1
#    (2 <=> 2) = 0
#    (2 <=> 1) = 1
sub _compare_entries {
    &App::sub_entry if ($App::trace);
    my ($self, $a, $b) = @_;
    my $verbose = $self->{verbose};
    my $sign = 0;
    my $columns   = $self->{sort_columns};
    my $sort_idx  = $self->{sort_idx};
    my $direction = $self->{direction};
    my $numeric   = $self->{numeric};
    if (ref($a) eq "ARRAY") {
        for (my $c = 0; $c <= $#$sort_idx; $c++) {
            if ($numeric && $numeric->[$c]) {
                $sign = ($a->[$sort_idx->[$c]] <=> $b->[$sort_idx->[$c]]) * $direction->[$c];
                #print "N: $sign = ($a->[$sort_idx->[$c]] <=> $b->[$sort_idx->[$c]]) * $direction->[$c];\n";
            }
            else {
                $sign = ($a->[$sort_idx->[$c]] cmp $b->[$sort_idx->[$c]]) * $direction->[$c];
                #print "C: $sign = ($a->[$sort_idx->[$c]] <=> $b->[$sort_idx->[$c]]) * $direction->[$c];\n";
            }
            # print "$sign = ($a->[$sort_idx->[$c]] cmp $b->[$sort_idx->[$c]]) * $direction->[$c];\n";
            last if ($sign);
        }
        #if ($verbose) {
        #    print "compare [$sign] [", join("|",@$a), "]\n";
        #    print "            [", join("|",@$b), "]\n";
        #}
    }
    else {
        for (my $c = 0; $c <= $#$columns; $c++) {
            if ($numeric && $numeric->[$c]) {
                $sign = ($a->{$columns->[$c]} <=> $b->{$columns->[$c]}) * $direction->[$c];
            }
            else {
                $sign = ($a->{$columns->[$c]} cmp $b->{$columns->[$c]}) * $direction->[$c];
            }
            last if ($sign);
        }
        #if ($verbose) {
        #    print "compare [$sign] {", join("|",%$a), "}\n";
        #    print "            {", join("|",%$b), "}\n";
        #}
    }
    &App::sub_exit($sign) if ($App::trace);
    return($sign);
}

#############################################################################
# acquire()
#############################################################################

=head2 acquire()

    * Signature: $entry = $q->acquire();
    * Param:     $sort_spec         string
    * Return:    $entry             HASH/ARRAY

    Sample Usage: 

    my $entry = $q->acquire();

=cut

sub acquire {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    my $entry = undef;
    die "acquire() not implemented";
   &App::sub_exit($entry) if ($App::trace);
   return($entry);
}

sub _acquire_in_mem {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;

    my $context = $self->{context};
    my ($entry);
    my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
    my $STATUS_ACQUIRED   = $self->{STATUS_ACQUIRED};
    my $status_attrib     = $self->{status_attrib};
    my $entries           = $self->{data};

    if ($self->num_entries() > 0 && $self->_global_resources_exist()) {
        ### If $entries is empty, reload because new subrequests
        ### have made it before our regular heartbeat has been scheduled,
        ### so num_entries returns with non-zero values, but there aren't any
        ### in buffers yet.
        ### This used to result in the "Acquire [M]: undef" errors in the log file
        if (!$entries || @$entries == 0) {
            $self->_maintain_queue_buffers();
        }
        if ($self->{type} eq "ARRAY") {
            my $colidx = $self->_colidx();
            my $status_idx = $colidx->{$status_attrib};
            die "status_attribute [$status_attrib] does not exist on elements of this queue" if (!defined $status_idx);
            my ($acquired);
            foreach my $e (@$entries) {
                # print ">>> ENTRY: status=[$e->[$status_idx]] idx=[$status_idx] (should be $STATUS_UNACQUIRED)\n";
                next if ($e->[$status_idx] ne $STATUS_UNACQUIRED);
                if ($self->_acquire_resources($e)) {
                    $entry = $e;
                    $acquired = $self->_acquire_entry($entry);
                    if ($acquired) {
                        $context->log({level=>3}, "$self->{name} : Acquired[M]: [", join("|",@$e), "]\n");
                        last;
                    }
                    else {
                        $self->_release_resources($entry);
                    }
                }
            }
        }
        else {
            my ($acquired, %unacquirable, $resource_key);
            foreach my $e (@$entries) {
                next if ($e->{$status_attrib} ne $STATUS_UNACQUIRED);
                if (!$e || !%$e) {
                    $context->log({level=>1}, "$self->{name} : Empty entry [$e]\n");
                }
                else {
                    $resource_key = $self->_resource_key($e);
                    if (!$unacquirable{$resource_key}) {
                        if ($self->_acquire_resources($e)) {
                            #my $dump_entry = "";
                            #$dump_entry .= "e (pristine) : " . Dumper($e);
                            $entry = $e;
                            $acquired = $self->_acquire_entry($entry);
                            #$dump_entry .= "entry (after _acquire_entry) : " . Dumper($entry);
                            #$dump_entry .= "acquired[$acquired]\n";
                            if ($acquired) {
                                $context->log({level=>3}, "$self->{name} : _acquire_in_mem: [" . join("|", $e->{shop_request_id}, $e->{subrequest_id}). "]\n");
                                last;
                            }
                            else {
                                ### THIS SHOULD NEVER HAPPEN, SOMEDAY FIGURE OUT WHY
                                $self->_release_resources($entry);
                                #$dump_entry .= "entry (after _release_resources) : " . Dumper($entry);
                                #if ($self->can("_db")) {
                                #    ### TODO: this is a debugging hack that shouldn't stay in it
                                #    my $db = $self->_db();
                                #    #$dump_entry .= "last sql stmt[$db->{sql}]\n";
                                #    #$context->log({level=>5}, "$self->{name} : Acquisition failed : $dump_entry\n");
                                #}
                                $entry = undef;
                            }
                        }
                        else {
                            $unacquirable{$resource_key} = 1;
                        }
                    }
                }
            }
        }
        ### This has been getting logged way too much,
        ### turns out this situation happens when we can't get a resource_key allocation
        ### for any entry because the throttles are all full
        #if (!$entry) {
        #    $context->log({level=>2}, "$self->{name} : _acquire_in_mem: undef\n");
        #}
    }

    &App::sub_exit($entry) if ($App::trace);
    return($entry);
}

sub _acquire_entry {
    &App::sub_entry if ($App::trace);
    my ($self, $entry) = @_;
    my $acquired = 1;
    $self->update($entry,[$self->{status_attrib}],[$self->{STATUS_ACQUIRED}]);
    &App::sub_exit($acquired) if ($App::trace);
    return($acquired);
}

sub acquired_entries {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;

    my (@entries);
    my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
    my $STATUS_UNBUFFERED = $self->{STATUS_UNBUFFERED};
    my $status_attrib     = $self->{status_attrib};
    my $entries           = $self->{data};

    if ($self->{type} eq "ARRAY") {
        my $colidx = $self->_colidx();
        my $status_idx = $colidx->{$status_attrib};
        foreach my $entry (@$entries) {
            if ($entry->[$status_idx] ne $STATUS_UNACQUIRED && $entry->[$status_idx] ne $STATUS_UNBUFFERED) {
                CORE::push(@entries, $entry);
            }
        }
    }
    else {
        foreach my $entry (@$entries) {
            if ($entry->{$status_attrib} ne $STATUS_UNACQUIRED && $entry->{$status_attrib} ne $STATUS_UNBUFFERED) {
                CORE::push(@entries, $entry);
            }
        }
    }

    &App::sub_exit(\@entries) if ($App::trace);
    return(\@entries);
}

#############################################################################
# release()
#############################################################################

=head2 release()

    * Signature: $q->release($entry);
    * Signature: $q->release($entry, $columns, $values);
    * Param:     $entry             HASH/ARRAY
    * Return:    undef

    Sample Usage: 

    $context = App->context();
    $q = $context->service("WorkQueue");  # or ...
    $q = $context->work_queue();
    $q->release({ name => "Joe", degrees => 3 });
    $q->release({ name => "Mike", degrees => 1 });

=cut

sub release {
    &App::sub_entry if ($App::trace);
    my ($self, $entry, $columns, $values) = @_;
    die "release() not implemented";
    &App::sub_exit() if ($App::trace);
}

sub _release_in_mem {
    &App::sub_entry if ($App::trace);
    my ($self, $entry, $columns, $values) = @_;

    my $STATUS_ACQUIRED   = $self->{STATUS_ACQUIRED};
    my $STATUS_RELEASED   = $self->{STATUS_RELEASED};
    my $status_attrib     = $self->{status_attrib};
    my $data              = $self->{data};
    my $context           = $self->{context};
    my ($e, $ent, $entry_key);

    my @columns = ( $self->{status_attrib} );
    my @values  = ( $self->{STATUS_RELEASED} );
    if ($columns) {
        CORE::push(@columns, @$columns);
        CORE::push(@values,  @$values);
    }

    my $released = 0;
    if ($self->{type} eq "ARRAY") {
        my $colidx = $self->_colidx();
        my $status_idx = $colidx->{$status_attrib};
        die "status_attribute [$status_attrib] does not exist on elements of this queue" if (!defined $status_idx);
        $entry_key = $self->_array_to_key($entry);
        for ($e = 0; $e <= $#$data; $e++) {
            $ent = $data->[$e];
            if ($self->_array_to_key($ent) eq $entry_key) {
                if ($ent->[$status_idx] eq $STATUS_ACQUIRED) {
                    $self->_release_resources($ent);
                }
                $self->update($ent,\@columns,\@values);
                splice(@$data, $e, 1);
                print "RELEASED[M]: [", join("|",@$entry), "]\n" if ($verbose);
                $released = 1;
                last;
            }
        }
    }
    else {
        $entry_key = $self->_hash_to_key($entry);
        for ($e = 0; $e <= $#$data; $e++) {
            $ent = $data->[$e];
            if ($self->_hash_to_key($ent) eq $entry_key) {
                if ($ent->{$status_attrib} eq $STATUS_ACQUIRED) {
                    $self->_release_resources($ent);
                }
                else {
                    $context->log({level=>1},"Release with entry status[$ent->{$status_attrib}] not equal STATUS_ACQUIRED[$STATUS_ACQUIRED] : actual/group data_source[$entry->{actual_data_source}/$entry->{group_data_source}]\n");
                }
                my $nrows = $self->update($ent,\@columns,\@values);
                if (!$nrows) {
                    $context->log({level=>1},"Update of db failed : " . join("|", %$ent) . "\n");
                }
                splice(@$data, $e, 1);
                print "RELEASED[M]: {", join("|",%$entry), "}\n" if ($verbose);
                $released = 1;
                last;
            }
        }
    }

    $context->log({level=>3},"Release[$released] key[$entry_key] entry[$entry->{subrequest_id} $entry->{actual_data_source}]\n");
    if (!$released) {
        $context->log({level=>1},"Not released: entry_keys[" . join(" ", sort(map {$self->_hash_to_key($_)} @$data)) . "]\n");
    }

    &App::sub_exit($released) if ($App::trace);
    return($released);
}

sub _array_to_key {
    my ($self, $entry) = @_;
    my ($key);
    my $auto_id_idx = $self->{auto_id_idx};
    if (defined $auto_id_idx) {
        $key = $entry->[$auto_id_idx];
    }
    else {
        my $id_indexes = $self->{id_indexes};
        if ($#$id_indexes == 0) {
            $key = $entry->[$id_indexes->[0]];
        }
        else {
            $key = join(":", @{$entry}[@$id_indexes]);
        }
    }
    return($key);
}

sub _hash_to_key {
    my ($self, $entry) = @_;
    my ($key);
    my $auto_id_attrib = $self->{auto_id_attrib};
    if (defined $auto_id_attrib) {
        $key = $entry->{$auto_id_attrib};
    }
    else {
        my $id_attribs = $self->{id_attribs};
        if ($#$id_attribs == 0) {
            $key = $entry->{$id_attribs->[0]};
        }
        else {
            $key = join(":", @{$entry}{@$id_attribs});
        }
    }
    return($key);
}

#############################################################################
# unacquire()
#############################################################################

=head2 unacquire()

    * Signature: $q->unacquire($entry);
    * Param:     $entry             HASH/ARRAY
    * Return:    undef

    Sample Usage: 

    $context = App->context();
    $q = $context->service("WorkQueue");  # or ...
    $q = $context->work_queue();
    $q->unacquire({ name => "Joe", degrees => 3 });
    $q->unacquire({ name => "Mike", degrees => 1 });

=cut

sub unacquire {
    &App::sub_entry if ($App::trace);
    my ($self, $entry) = @_;
    die "unacquire() not implemented";
    &App::sub_exit() if ($App::trace);
}

sub _unacquire_in_mem {
    &App::sub_entry if ($App::trace);
    my ($self, $entry) = @_;

    my $STATUS_ACQUIRED   = $self->{STATUS_ACQUIRED};
    my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
    my $status_attrib     = $self->{status_attrib};
    my $data              = $self->{data};
    my ($e, $ent, $entry_key);

    if ($self->{type} eq "ARRAY") {
        my $colidx = $self->_colidx();
        my $status_idx = $colidx->{$status_attrib};
        die "status_attribute [$status_attrib] does not exist on elements of this queue" if (!defined $status_idx);
        $entry_key = $self->_array_to_key($entry);
        for ($e = 0; $e <= $#$data; $e++) {
            $ent = $data->[$e];
            if ($self->_array_to_key($ent) eq $entry_key) {
                if ($ent->[$status_idx] eq $STATUS_ACQUIRED) {
                    $self->_release_resources($ent);
                }
                $self->_unacquire_entry($ent);
                print "UNACQUIRED[M]: [", join("|",@$ent), "]\n" if ($verbose);
                last;
            }
        }
    }
    else {
        $entry_key = $self->_hash_to_key($entry);
        for ($e = 0; $e <= $#$data; $e++) {
            $ent = $data->[$e];
            if ($self->_hash_to_key($ent) eq $entry_key) {
                if ($ent->{$status_attrib} eq $STATUS_ACQUIRED) {
                    $self->_release_resources($ent);
                }
                $self->_unacquire_entry($ent);
                print "UNACQUIRED[M]: {", join("|",%$ent), "}\n" if ($verbose);
                last;
            }
        }
    }

    &App::sub_exit() if ($App::trace);
}

sub _unacquire_entry {
    &App::sub_entry if ($App::trace);
    my ($self, $entry) = @_;
    $self->update($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
    &App::sub_exit() if ($App::trace);
}

#############################################################################
# locate()
#############################################################################

=head2 locate()

    * Signature: @entries = $q->locate();
    * Signature: @entries = $q->locate($params);
    * Signature: @entries = $q->locate($params, $options);
    * Param:     $params         HASH
    * Param:     $options        HASH
    * Return:    @entries

    Sample Usage: 

    my @entries = $q->locate();

=cut

sub locate {
    &App::sub_entry if ($App::trace);
    my ($self, $params, $options) = @_;
    my @entries = ();
    die "locate() not implemented";
    &App::sub_exit(@entries) if ($App::trace);
    return(@entries);
}

sub _locate_in_mem {
    &App::sub_entry if ($App::trace);
    my ($self, $params, $options) = @_;

    my (@entries, @param_name, @param_value, $p, $matches);
    if ($params && %$params) {
        @param_name = (keys %$params);
        for ($p = 0; $p <= $#param_name; $p++) {
            $param_value[$p] = $params->{$param_name[$p]};
        }
    }

    my $entries = $self->{data};

    if ($#param_name == -1) {
        @entries = @$entries;
    }
    else {
        if ($self->{type} eq "ARRAY") {
            my $colidx = $self->_colidx();
            my (@param_idx);
            for ($p = 0; $p <= $#param_name; $p++) {
                $param_idx[$p] = $colidx->{$param_name[$p]};
            }
            foreach my $e (@$entries) {
                $matches = 1;
                for ($p = 0; $p <= $#param_name; $p++) {
                    if ($e->[$param_idx[$p]] ne $param_value[$p]) {
                        $matches = 0;
                        last;
                    }
                }
                CORE::push(@entries, $e) if ($matches);
            }
        }
        else {
            foreach my $e (@$entries) {
                $matches = 1;
                for ($p = 0; $p <= $#param_name; $p++) {
                    if ($e->{$param_name[$p]} ne $param_value[$p]) {
                        $matches = 0;
                        last;
                    }
                }
                CORE::push(@entries, $e) if ($matches);
            }
        }
    }

    &App::sub_exit(@entries) if ($App::trace);
    return(@entries);
}

#############################################################################
# update()
#############################################################################

=head2 update()

    * Signature: $q->update($entry, $columns, $values);
    * Param:     $entry          HASH/ARRAY
    * Param:     $columns        ARRAY
    * Param:     $values         ARRAY
    * Return:    @entries

    Sample Usage: 

    my @entries = $q->update();

=cut

sub update {
    &App::sub_entry if ($App::trace);
    my ($self, $entry, $columns, $values) = @_;
    die "update() not implemented";
    &App::sub_exit() if ($App::trace);
}

sub _update_ref {
    &App::sub_entry if ($App::trace);
    my ($self, $entry, $columns, $values, $one_way, $raw) = @_;

    my $status_attrib = $self->{status_attrib};
    if ($self->{type} eq "ARRAY") {
        my $colidx = $self->_colidx();
        my $status_idx = $colidx->{$status_attrib};
        for (my $c = 0; $c <= $#$columns; $c++) {
            if (!$raw && $columns->[$c] eq $status_attrib) {
                $self->_maintain_status_counts(1, $values->[$c], $one_way ? undef : $entry->[$status_idx]);
            }
            $entry->[$colidx->{$columns->[$c]}] = $values->[$c];
        }
    }
    else {
        for (my $c = 0; $c <= $#$columns; $c++) {
            if (!$raw && $columns->[$c] eq $status_attrib) {
                $self->_maintain_status_counts(1, $values->[$c], $one_way ? undef : $entry->{$status_attrib});
            }
            $entry->{$columns->[$c]} = $values->[$c];
        }
    }

    &App::sub_exit(1) if ($App::trace);
    return 1;
}

sub _maintain_status_counts {
    &App::sub_entry if ($App::trace);
    my ($self, $count, $status, $from_status) = @_;
    if (ref($status)) {
        my $entry = $status;
        my $status_attrib = $self->{status_attrib};
        if ($self->{type} eq "ARRAY") {
            my $colidx = $self->_colidx();
            $status = $entry->[$colidx->{$status_attrib}];
        }
        else {
            $status = $entry->{$status_attrib};
        }
    }
    my $status_counts = $self->{status_counts};
    $status_counts->{$status} += $count;
    if ($from_status) {
        $status_counts->{$from_status} -= $count;
    }
    else {
        $status_counts->{ALL} += $count;
    }
    &App::sub_exit() if ($App::trace);
}

sub schedule_entry_acquisition {
    &App::sub_entry if ($App::trace);
    my ($self, $event, $max_events, $event_args) = @_;
    $max_events ||= 1;
    $event_args ||= {};
    $self->{acquisition_event} = $event;
    my $context = $self->{context};
    $context->extend_event_loop($self, "dispatch_events", [$max_events, $event_args]);
    &App::sub_exit() if ($App::trace);
}

sub dispatch_events {
    &App::sub_entry if ($App::trace);
    my ($self, $max_events, $event_args) = @_;
    my $context = $self->{context};
    $max_events = 1 if (!$max_events);
    my $num_events = 0;
    if ($self->{acquisition_event}) {
        my $context = $self->{context};
        my ($entry, %event);
        while ($num_events < $max_events) {
            $entry = $self->acquire();
            ### $entry will be undef if we don't have any entries with throttle (resource) availability
            last if (!$entry);
            $num_events++;
            %event = %{$self->{acquisition_event}};
            ### TODO: figure out how to abstract out the app_type part of this!!!
            #$event{args} = [ $self->{acquisition_event}{app_type}, $entry ];
            if ($event_args) {
                $event{args} = [ $entry, $event_args ];
            }
            $context->profile_start(sprintf("dispatch_events : %s %s", $event{name}, $event{method})) if $context->{poe_profile};
            $context->send_event(\%event);
            $context->profile_stop(sprintf("dispatch_events : %s %s", $event{name}, $event{method})) if $context->{poe_profile};
        }
    }
    &App::sub_exit($num_events) if ($App::trace);
    return($num_events);
}

#############################################################################
# print()
#############################################################################

=head2 print()

    * Signature: $q->print($fh);
    * Param:     $fh                FILEHANDLE
    * Return:    void

    Sample Usage: 

    $q->print(\*STDOUT);

=cut

sub print {
    &App::sub_entry if ($App::trace);
    my ($self, $fh) = @_;
    $fh = \*STDOUT if (!$fh);
    print $fh "ENTRIES:\n";
    $self->print_entries($fh);
    print $fh "CONSTRAINTS:\n";
    $self->print_constraints($fh);
    &App::sub_exit() if ($App::trace);
}

#############################################################################
# print_constraints()
#############################################################################

=head2 print_constraints()

    * Signature: $q->print_constraints();
    * Signature: $q->print_constraints($fh);
    * Param:     $fh                FILEHANDLE
    * Return:    void

    Sample Usage: 

    $q->print_constraints();
    $q->print_constraints(\*STDOUT);

=cut

sub print_constraints {
    &App::sub_entry if ($App::trace);
    my ($self, $fh) = @_;
    $fh = \*STDOUT if (!$fh);

    foreach my $c (@{$self->{global_constraints}}) {
        printf $fh "     GLOBAL CONSTRAINT: %4d/%4d [%s]\n", $c->[0]{$c->[2]}, $c->[1]{$c->[3]}, $c->[4];
    }
    foreach my $c (@{$self->{constraints}}) {
        my (%seen);
        printf $fh "   -----------------------------\n";
        foreach my $key (sort keys %{$c->[1]}) {
            $seen{$key} = 1;
            printf $fh "   %8s CONSTRAINT: %4d/%4d [%s]\n", $key, $c->[0]{$key}, $c->[1]{$key}, $c->[2];
        }
        foreach my $key (sort keys %{$c->[0]}) {
            if (!$seen{$key}) {
                $seen{$key} = 1;
                printf $fh "   %8s CONSTRAINT: %4d/%4d [%s]\n", $key, $c->[0]{$key}, $c->[1]{$key}, $c->[2];
            }
        }
    }

    &App::sub_exit() if ($App::trace);
}

#############################################################################
# print_entries()
#############################################################################

=head2 print_entries()

    * Signature: $q->print_entries($fh, $format, $columns);
    * Param:     $fh                FILEHANDLE
    * Param:     $format            string
    * Param:     $columns              ARRAY
    * Return:    void

    Sample Usage: 

    $q->print_entries(\*STDOUT, "%s %s %s", ["ip","hostname","path"]);

=cut

sub print_entries {
    &App::sub_entry if ($App::trace);
    my ($self, $fh, $format, $columns) = @_;
    die "print_entries() not implemented";
    &App::sub_exit() if ($App::trace);
}

sub _print_entries_in_mem {
    &App::sub_entry if ($App::trace);
    my ($self, $fh, $format, $columns) = @_;
    $fh = \*STDOUT if (!$fh);

    my ($entry);
    my $entries = $self->{data};

    if ($self->{type} eq "ARRAY") {
        my $colidx = $self->_colidx();
        my (@idx, $i);
        for ($i = 0; $i <= $#$columns; $i++) {
            $idx[$i] = $colidx->{$columns->[$i]};
        }
        foreach my $entry (@$entries) {
            if ($format) {
                printf $fh $format, @{$entry}[@idx];
            }
            else {
                print $fh "   [", join("|",@$entry), "]\n";
            }
        }
    }
    else {
        foreach my $entry (@$entries) {
            if ($format) {
                printf $fh $format, @{$entry}{@$columns};
            }
            else {
                print $fh "   {", join("|",%$entry), "}\n";
            }
        }
    }

    &App::sub_exit() if ($App::trace);
}

sub print_status_counts {
    &App::sub_entry if ($App::trace);
    my ($self, $fh) = @_;
    $fh = \*STDOUT if (!$fh);
    my $status_counts = $self->{status_counts};
    foreach my $status (sort keys %$status_counts) {
        printf $fh "   %7s => %8d\n", $status, $status_counts->{$status};
    }
    &App::sub_exit() if ($App::trace);
}

sub print_resource_counts {
    &App::sub_entry if ($App::trace);
    my ($self, $fh) = @_;
    $fh = \*STDOUT if (!$fh);
    my $resource_counts = $self->_resource_counts();
    foreach my $resource_key (sort keys %{$resource_counts->{total}}) {
        printf $fh "   %9s => %8d=tot %8d=buf %8d=BUFSIZE\n",
            "[$resource_key]",
            $resource_counts->{total}{$resource_key},
            $resource_counts->{buffer}{$resource_key},
            $self->{BUFFER_SIZE};
    }
    &App::sub_exit() if ($App::trace);
}

#############################################################################
# num_entries()
#############################################################################

=head2 num_entries()

    * Signature: $q->num_entries();
    * Signature: $q->num_entries($status);
    * Param:     $status         string
    * Return:    void

    Sample Usage: 

    $num = $q->num_entries();

=cut

sub num_entries {
    &App::sub_entry if ($App::trace);
    my ($self, $status) = @_;
    my $num = 0;
    die "num_entries() not implemented";
    &App::sub_exit($num) if ($App::trace);
    return($num);
}

sub _num_entries_in_mem {
    &App::sub_entry if ($App::trace);
    my ($self, $status) = @_;
    my $entries = $self->{data};
    my $num = $#$entries + 1;
    if ($status) {
        $num = 0;
        my $status_attrib     = $self->{status_attrib};
        if ($self->{type} eq "ARRAY") {
            my $colidx = $self->_colidx();
            my $status_idx = $colidx->{$status_attrib};
            foreach my $entry (@$entries) {
                $num ++ if ($entry->[$status_idx] eq $status);
            }
        }
        else {
            foreach my $entry (@$entries) {
                $num ++ if ($entry->{$status_attrib} eq $status);
            }
        }
    }
    &App::sub_exit($num) if ($App::trace);
    return($num);
}

sub _num_entries_from_status_counts {
    &App::sub_entry if ($App::trace);
    my ($self, $status) = @_;
    my $status_counts = $self->{status_counts};
    my ($num);
    if ($status) {
        $num = $status_counts->{$status};
    }
    else {
        $num = $status_counts->{$self->{STATUS_UNBUFFERED}};
        $num += $status_counts->{$self->{STATUS_UNACQUIRED}}
            if ($self->{STATUS_UNBUFFERED} ne $self->{STATUS_UNACQUIRED});
    }
    $num = 0 if (!defined $num);
    &App::sub_exit($num) if ($App::trace);
    return($num);
}

#############################################################################
# set_global_constraint()
#############################################################################

=head2 set_global_constraint()

    * Signature: $q->set_global_constraint($counts, $limits, $count_attrib, $limit_attrib);
    * Param:     $counts            HASH
    * Param:     $limits            HASH
    * Param:     $count_attrib      string
    * Param:     $limit_attrib      string
    * Return:    undef

    Sample Usage: 

    $context = App->context();
    $q = $context->service("WorkQueue");  # or ...
    $q = $context->work_queue();
    $q->push({ name => "Joe", degrees => 3 });
    $q->push({ name => "Mike", degrees => 1 });
    my %state = (
        count => 0,
        limit => 20,
    );
    $q->set_global_constraint(\%state, \%state, "count", "limit");

=cut

sub set_global_constraint {
    &App::sub_entry if ($App::trace);
    my ($self, $counts, $limits, $count_attrib, $limit_attrib, $value_attrib, $hard_limit) = @_;

    my $global_constraints = $self->{global_constraints};
    if (!$global_constraints) {
        $global_constraints = [];
        $self->{global_constraints} = $global_constraints;
    }

    CORE::push(@$global_constraints, [ $counts, $limits, $count_attrib, $limit_attrib, $value_attrib, $hard_limit ]);

    &App::sub_exit() if ($App::trace);
}

#############################################################################
# set_constraint()
#############################################################################

=head2 set_constraint()

    * Signature: $q->set_constraint($counts, $limits, $key_attrib, $count_attrib);
    * Signature: $q->set_constraint($counts, $limits, $key_attrib);
    * Param:     $key_attrib        string
    * Param:     $count_attrib      string
    * Param:     $counts            HASH
    * Param:     $limits            HASH
    * Return:    undef

    Sample Usage: 

    $context = App->context();
    $q = $context->service("WorkQueue");  # or ...
    $q = $context->work_queue();
    $q->push({ name => "Joe", degrees => 3 });
    $q->push({ name => "Mike", degrees => 1 });
    my (%counts);
    my %limits = (
    );
    $q->set_constraint(\%counts, \%limits, "degrees");

=cut

sub set_constraint {
    &App::sub_entry if ($App::trace);
    my ($self, $counts, $limits, $key_attrib, $count_attrib) = @_;

    my $constraints = $self->{constraints};
    if (!$constraints) {
        $constraints = [];
        $self->{constraints} = $constraints;
    }
     
    if ($self->{type} eq "ARRAY") {
        my $colidx = $self->_colidx();
        my $key_idx = $colidx->{$key_attrib};
        die "key_attribute [$key_attrib] does not exist on elements of this queue" if (!defined $key_idx);
        my ($count_idx);
        if ($count_attrib) {
            $count_idx = $colidx->{$key_attrib};
            die "count_attribute [$count_attrib] does not exist on elements of this queue" if (!defined $count_idx);
        }
        CORE::push(@$constraints, [ $counts, $limits, $key_attrib, $count_attrib, $key_idx, $count_idx ]);
    }
    else {
        CORE::push(@$constraints, [ $counts, $limits, $key_attrib, $count_attrib ]);
    }

    &App::sub_exit() if ($App::trace);
}

#############################################################################
# clear_constraints()
#############################################################################

=head2 clear_constraints()

    * Signature: $q->clear_constraints();
    * Param:     void
    * Return:    void

    Sample Usage: 

    $q->clear_constraints();

=cut

sub clear_constraints {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;

    delete $self->{constraints};

    &App::sub_exit() if ($App::trace);
}

#############################################################################
# count_entries_by_attrib()
#############################################################################

=head2 count_entries_by_attrib()

    * Signature: $q->count_entries_by_attrib($attrib, $counts);
    * Param:     $attrib            string
    * Param:     $counts            HASH
    * Return:    undef

    Sample Usage: 

    $context = App->context();
    $q = $context->service("WorkQueue");  # or ...
    $q = $context->work_queue();
    $q->push({ name => "Joe", degrees => 3 });
    $q->push({ name => "Mike", degrees => 1 });
    my (%counts);
    $q->count_entries_by_attrib("degrees", \%counts);

=cut

sub count_entries_by_attrib {
    &App::sub_entry if ($App::trace);
    my ($self, $key_attrib, $counts, $count_attrib) = @_;
    die "count_entries_by_attrib() not implemented";
    &App::sub_exit() if ($App::trace);
}

sub _count_entries_by_attrib_in_mem {
    &App::sub_entry if ($App::trace);
    my ($self, $key_attrib, $counts, $count_attrib) = @_;
    if ($self->{type} eq "ARRAY") {
        my $colidx = $self->_colidx();
        die "key_attribute [$key_attrib] does not exist on elements of this queue" if (!exists $colidx->{$key_attrib});
        my $i = $colidx->{$key_attrib};
        if ($count_attrib) {
            my $count_i = $colidx->{$count_attrib};
            die "count_attribute [$count_attrib] does not exist on elements of this queue" if (! defined $count_i);
            foreach my $entry (@{$self->{data}}) {
                $counts->{$entry->[$i]} += $entry->[$count_i];
            }
        }
        else {
            foreach my $entry (@{$self->{data}}) {
                $counts->{$entry->[$i]} ++;
            }
        }
    }
    else {
        if ($count_attrib) {
            foreach my $entry (@{$self->{data}}) {
                $counts->{$entry->{$key_attrib}} += $entry->{$count_attrib};
            }
        }
        else {
            foreach my $entry (@{$self->{data}}) {
                $counts->{$entry->{$key_attrib}} ++;
            }
        }
    }
    &App::sub_exit() if ($App::trace);
}

#############################################################################
# PROTECTED METHODS
#############################################################################

=head1 Protected Methods:

=cut

#############################################################################
# Method: _colidx()
#############################################################################

=head2 _colidx()

Returns the column-to-index hashref.

    * Signature: $colidx = $q->_colidx();
    * Param:     void
    * Return:    $colidx      HASH

    $colidx = $q->_colidx();
    $idx = $colidx->{$column};   # get the column index for a named $column

=cut

sub _colidx {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    my $colidx = $self->{colidx};
    if (!$colidx && $self->{table}) {
        my $columns = $self->{columns} || die "columns of the queue elements must be supplied in order to know the column indexes";
        $colidx = {};
        for (my $i = 0; $i <= $#$columns; $i++) {
            $colidx->{$columns->[$i]} = $i;
        }
        $self->{colidx} = $colidx;
    }
    &App::sub_exit($colidx) if ($App::trace);
    return($colidx);
}

#############################################################################
# _global_resources_exist()
#############################################################################

=head2 _global_resources_exist()

    * Signature: $q->_global_resources_exist();
    * Param:     $constraints       ARRAY
    * Return:    undef

    Sample Usage: 

    $context = App->context();
    $q = $context->service("WorkQueue");  # or ...
    $q = $context->work_queue();

    $q->_global_resources_exist();

=cut

sub _global_resources_exist {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    my $complies = 1;
    my $global_constraints = $self->{global_constraints};
    if (!$global_constraints || $#$global_constraints == -1) {
        # do nothing
    }
    else {
        my $context = $self->{context};
        my ($limit);
        foreach my $c (@$global_constraints) {
            $limit = $c->[$GCONSTR_LIMITS]{$c->[$GCONSTR_LIMIT_ATTRIB]};
            $limit = -$limit if ($limit < 0);
            if ($c->[$GCONSTR_COUNTS]{$c->[$GCONSTR_COUNT_ATTRIB]} >= $limit) {
                $complies = 0;
                last;
            }
        }
    }
    &App::sub_exit($complies) if ($App::trace);
    return($complies);
}

#############################################################################
# _acquire_resources()
#############################################################################

=head2 _acquire_resources()

    * Signature: $q->_acquire_resources($entry, $constraints);
    * Param:     $entry             ARRAY/HASH
    * Param:     $constraints       ARRAY
    * Return:    undef

    Sample Usage: 

    $context = App->context();
    $q = $context->service("WorkQueue");  # or ...
    $q = $context->work_queue();
    $q->push({ name => "Joe", degrees => 3 });
    $q->push({ name => "Mike", degrees => 1 });

    $q->_acquire_resources($entry);
    $q->_acquire_resources($entry, $constraints);

=cut

sub _acquire_resources {
    &App::sub_entry if ($App::trace);
    my ($self, $entry, $constraints) = @_;
    my $complies = 1;
    my $context = $self->{context};

    my $global_constraints = $self->{global_constraints};
    if (!$global_constraints || $#$global_constraints == -1) {
        # do nothing
    }
    else {
        my ($limit);
        foreach my $c (@$global_constraints) {
            $limit = $c->[$GCONSTR_LIMITS]{$c->[$GCONSTR_LIMIT_ATTRIB]};
            $limit = -$limit if ($limit < 0);
            if ($c->[$GCONSTR_COUNTS]{$c->[$GCONSTR_COUNT_ATTRIB]} >= $limit) {
                $complies = 0;
                last;
            }
        }
    }

    $constraints ||= $self->{constraints};
    # print "_acquire_resources(): complies=[$complies] constraints=[$constraints] #constraints=[$#$constraints]\n";
    if (!$complies || !$constraints || $#$constraints == -1) {
        # do nothing
    }
    else {
        my ($key, $count_incr, $limit);

        if ($self->{type} eq "ARRAY") {
            foreach my $c (@$constraints) {
                $key = $entry->[$c->[$CONSTR_KEY_IDX]];
                $limit = $c->[$CONSTR_LIMITS]{$key};
                $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined $limit);
                if (defined $limit) {
                    $limit = -$limit if ($limit < 0);
                    $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ? $entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
                    if ($c->[$CONSTR_COUNTS]{$key} + $count_incr > $limit) {
                        $complies = 0;
                        last;
                    }
                }
            }
            if ($complies) {
                foreach my $c (@$constraints) {
                    $key = $entry->[$c->[$CONSTR_KEY_IDX]];
                    $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ? $entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
                    $c->[$CONSTR_COUNTS]{$key} += $count_incr;
                }
            }
        }
        else {
            foreach my $c (@$constraints) {
                $key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
                $limit = $c->[$CONSTR_LIMITS]{$key};
                $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined $limit);
                if (defined $limit) {
                    $limit = -$limit if ($limit < 0);
                    $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ? $entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
                    if ($c->[$CONSTR_COUNTS]{$key} + $count_incr > $limit) {
                        $complies = 0;
                        last;
                    }
                }
            }
            if ($complies) {
                foreach my $c (@$constraints) {
                    $key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
                    $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ? $entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
                    $c->[$CONSTR_COUNTS]{$key} += $count_incr;
                }
            }
        }
    }

    if ($complies) {
        foreach my $c (@$global_constraints) {
            $c->[$GCONSTR_COUNTS]{$c->[$GCONSTR_COUNT_ATTRIB]} ++;
        }
    }

    &App::sub_exit($complies) if ($App::trace);
    return($complies);
}

#############################################################################
# _release_resources()
#############################################################################

=head2 _release_resources()

    * Signature: $q->_release_resources($entry, $constraints);
    * Param:     $entry             ARRAY/HASH
    * Param:     $constraints       ARRAY
    * Return:    undef

    Sample Usage: 

    $context = App->context();
    $q = $context->service("WorkQueue");  # or ...
    $q = $context->work_queue();
    $q->push({ name => "Joe", degrees => 3 });
    $q->push({ name => "Mike", degrees => 1 });

    $q->_release_resources($entry);
    $q->_release_resources($entry, $constraints);

=cut

sub _release_resources {
    &App::sub_entry if ($App::trace);
    my ($self, $entry, $constraints) = @_;

    my $global_constraints = $self->{global_constraints};
    if (!$global_constraints || $#$global_constraints == -1) {
        # do nothing
    }
    else {
        foreach my $c (@$global_constraints) {
            $c->[$GCONSTR_COUNTS]{$c->[$GCONSTR_COUNT_ATTRIB]} --;
        }
    }

    $constraints ||= $self->{constraints};
    if (!$constraints || $#$constraints == -1) {
        # do nothing
    }
    else {
        my ($key, $count_incr, $limit);
        if ($self->{type} eq "ARRAY") {
            foreach my $c (@$constraints) {
                $key = $entry->[$c->[$CONSTR_KEY_IDX]];
                $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ? $entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
                $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
            }
        }
        else {
            foreach my $c (@$constraints) {
                $key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
                $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ? $entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
                $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
            }
        }
    }
    &App::sub_exit(1) if ($App::trace);
    return(1);
}

sub _resource_counts {
    my ($self) = @_;
    my $resource_counts = $self->{resource_counts};
    if (!$resource_counts) {
        $resource_counts = $self->_initialize_resource_counts();
        $self->{resource_counts} = $resource_counts;
    }
    return($resource_counts);
}

sub _initialize_resource_counts {
    my ($self) = @_;
    my $resource_counts = {
        total => {},
        buffer => {},
    };
    return($resource_counts);
}

#############################################################################
# _maintain_queue_buffers()
#############################################################################

=head2 _maintain_queue_buffers()

    * Signature: $q->_maintain_queue_buffers($add, $entry);
    * Param:     $op                [push,acquire,release,unacquire]
    * Param:     $entry             ARRAY/HASH
    * Return:    undef

    Sample Usage: 

    $context = App->context();
    $q = $context->service("WorkQueue");  # or ...
    $q = $context->work_queue();
    $q->push({ name => "Joe", degrees => 3 });
    $q->push({ name => "Mike", degrees => 1 });

    $q->_maintain_queue_buffers(1,$entry);
    $q->_maintain_queue_buffers(-1,$entry);
    $q->_maintain_queue_buffers();

Queues may be implemented with remote storage.  In that case, there are local
queue buffers which are maintained in memory to increase performance.
There is conceptually a queue buffer for each combination of constraint values
(the constraint key of an entry).

The Queue Buffers are maintained each time an entry is push()ed release()d.
The count for the particular constraint key is modified.
If the count falls below a configured low-water mark, a new set of entries 
are read in from the remote storage.

=cut

sub _maintain_queue_buffers {
    &App::sub_entry if ($App::trace);
    my ($self, $op, $entry, $columns, $values) = @_;
    my $context = $self->{context};

    $op ||= "";
    my $BUFFER_SIZE = $self->{BUFFER_SIZE};

    if ($entry) {
        my $resource_key = $self->_resource_key($entry);
        my $resource_counts = $self->_resource_counts();

        if ($op eq "push") {
            $resource_counts->{total}{$resource_key}++;
            if ($resource_counts->{buffer}{$resource_key} < $BUFFER_SIZE) {
                $resource_counts->{buffer}{$resource_key}++;
                $self->_push_in_mem($entry);
            }
            else {
                $self->_push_in_mem($entry,1); # release lowest
            }
        }
        elsif ($op eq "acquire") {
            $resource_counts->{total}{$resource_key}--;
            $resource_counts->{buffer}{$resource_key}--;
        }
        elsif ($op eq "release") {
            my $status_attrib = $self->{status_attrib};
            my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
            my $release_without_acquire = 0;
            if ($self->{type} eq "ARRAY") {
                my $colidx = $self->_colidx();
                my $status_idx = $colidx->{$status_attrib};
                if ($entry->[$status_idx] ne $STATUS_ACQUIRED) {
                    $release_without_acquire = 1;
                }
            }
            else {
                if ($entry->{$status_attrib} ne $STATUS_ACQUIRED) {
                    $release_without_acquire = 1;
                }
            }
            ### TODO: figure out how to maintain numbers when $released is false, causing constraint issues
            my $released = $self->_release_in_mem($entry, $columns, $values);
            if ($released) {
                if ($release_without_acquire) {
                    $resource_counts->{total}{$resource_key}--;
                    $resource_counts->{buffer}{$resource_key}--;
                }
                else {
                    # do nothing
                }
            }
            else {
                my $nrows = $self->_release_in_db($entry,$columns,$values);
                if ($release_without_acquire) {
                    $resource_counts->{total}{$resource_key}--;
                }
                else {
                    # do nothing
                }
                my $context = $self->{context};
                $context->log({level=>1},"Released something not in memory entry=[$entry] nrows=[$nrows] release_without_acquire=[$release_without_acquire]\n");
            }
        }
        elsif ($op eq "unacquire") {
            $resource_counts->{total}{$resource_key}++;
            $resource_counts->{buffer}{$resource_key}++;
        }
        my $num_total       = $resource_counts->{total}{$resource_key};
        my $num_in_buffer   = $resource_counts->{buffer}{$resource_key};
        if ($num_total > $num_in_buffer && $num_in_buffer < $BUFFER_SIZE) {
            my $num_added = $self->_refill_buffer($resource_key);
            $resource_counts->{buffer}{$resource_key} += $num_added;
        }
    }
    else {
        my $resource_counts = $self->_resource_counts();
        foreach my $resource_key (keys %{$resource_counts->{total}}) {
            my $num_total       = $resource_counts->{total}{$resource_key};
            my $num_in_buffer   = $resource_counts->{buffer}{$resource_key};
            if ($num_total > $num_in_buffer && $num_in_buffer < $BUFFER_SIZE) {
                my $num_added = $self->_refill_buffer($resource_key);
                $resource_counts->{buffer}{$resource_key} += $num_added;
            }
        }
    }

    &App::sub_exit() if ($App::trace);
}

#############################################################################
# _resource_key()
#############################################################################

=head2 _resource_key()

    * Signature: $q->_resource_key($entry);
    * Param:     $entry             ARRAY/HASH
    * Return:    undef

    Sample Usage: 

    $context = App->context();
    $q = $context->service("WorkQueue");  # or ...
    $q = $context->work_queue();
    $q->push({ name => "Joe", degrees => 3 });
    $q->push({ name => "Mike", degrees => 1 });

    $resource_key = $q->_resource_key($entry);

=cut

sub _resource_key {
    &App::sub_entry if ($App::trace);
    my ($self, $entry) = @_;

    my $resource_key = "";
    my $constraints = $self->{constraints};
    if (!$constraints || $#$constraints == -1) {
        # do nothing
    }
    else {
        my (@resource_key, $key, $count_incr, $limit);
        if ($self->{type} eq "ARRAY") {
            foreach my $c (@$constraints) {
                $key = $entry->[$c->[$CONSTR_KEY_IDX]];
                $key = "" if (!defined $key);
                CORE::push(@resource_key, $key);
            }
        }
        else {
            foreach my $c (@$constraints) {
                $key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
                $key = "" if (!defined $key);
                CORE::push(@resource_key, $key);
            }
        }
        $resource_key = join(":",@resource_key);
    }
    &App::sub_exit($resource_key) if ($App::trace);
    return($resource_key);
}

sub _resource_key_to_params {
    &App::sub_entry if ($App::trace);
    my ($self, $resource_key) = @_;

    my $params = {};
    my $constraints = $self->{constraints};
    if (!$constraints || $#$constraints == -1) {
        # do nothing
    }
    else {
        my @resource_key = split(/:/, $resource_key);
        for (my $i = 0; $i <= $#$constraints; $i++) {
            $params->{$constraints->[$i][$CONSTR_KEY_ATTRIB]} = $resource_key[$i];
        }
    }
    &App::sub_exit($params) if ($App::trace);
    return($params);
}

#############################################################################
# Method: service_type()
#############################################################################

=head2 service_type()

Returns "WorkQueue";

    * Signature: $service_type = App::WorkQueue->service_type();
    * Param:     void
    * Return:    $service_type  string

    $service_type = $serializer->service_type();

=cut

sub service_type () { "WorkQueue"; }

=head1 ACKNOWLEDGEMENTS

 * (c) 2010 Stephen Adkins
 * Author:  Stephen Adkins <spadkins@gmail.com>
 * License: This is free software. It is licensed under the same terms as Perl itself.

=head1 SEE ALSO

L<C<App::Context>|App::Context>,
L<C<App::Service>|App::Service>

=cut

1;








